﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using QQ2564874169.Core.Utils;

namespace QQ2564874169.MessageQueue
{
    public interface IMqSubscriber : IDisposable
    {
        event Action<MqMessage> Received;

        void Consume(string queue, object state = null);

        void Leave(string queue = null);
    }

    public class AppMqSubscriber : IMqSubscriber
    {
        private static IList<AppMqSubscriber> _subscribers = new List<AppMqSubscriber>();

        public event Action<MqMessage> Received;
        private Dictionary<string, object> _state = new Dictionary<string, object>();
        private Queue<MqMessage> _reconsumelist = new Queue<MqMessage>();

        static AppMqSubscriber()
        {
            ExecuteReconsume();
            AppMqPublisher.OnSend += AppMqPublisher_OnSend;
        }

        private static void ExecuteReconsume()
        {
            TaskHelper.Run(() =>
            {
                while (true)
                {
                    AppMqSubscriber[] subs;
                    lock (_subscribers)
                    {
                        subs = _subscribers.Where(i => i._reconsumelist.Count > 0).ToArray();
                    }

                    Parallel.ForEach(subs, sub =>
                    {
                        MqMessage[] messages;
                        lock (sub._reconsumelist)
                        {
                            messages = sub._reconsumelist.ToArray();
                            sub._reconsumelist.Clear();
                        }
                        foreach (var msg in messages)
                        {
                            sub.OnReceived(msg);
                            if (msg.Reconsume)
                            {
                                lock (sub._reconsumelist)
                                {
                                    sub._reconsumelist.Enqueue(msg);
                                }
                            }
                        }
                    });
                    TaskHelper.Delay(1000).Wait();
                }
            })
            .ContinueWith(t =>
            {
                ExecuteReconsume();
            });
        }

        private static void AppMqPublisher_OnSend(string queue, string[] message)
        {
            AppMqSubscriber[] subscr;
            lock (_subscribers)
            {
                subscr = _subscribers.Where(i => i._state.ContainsKey(queue)).ToArray();
            }
            if (subscr.Length < 1)
            {
                return;
            }
            foreach (var sub in subscr)
            {
                Task.Factory.StartNew(OnMessage, new object[] {sub, queue, message});
            }
        }

        private static void OnMessage(object messageState)
        {
            var args = (object[]) messageState;
            var sub = (AppMqSubscriber)args[0];
            var queue = (string)args[1];
            var message = (string[])args[2];
            var state = sub._state[queue];
            foreach (var msg in message)
            {
                var mq = new MqMessage(queue, msg, state);
                sub.OnReceived(mq);
                if (mq.Reconsume)
                {
                    lock (sub._reconsumelist)
                    {
                        sub._reconsumelist.Enqueue(mq);
                    }
                }
            }
        }

        public AppMqSubscriber()
        {
            lock (_subscribers)
            {
                _subscribers.Add(this);
            }
        }

        protected virtual void OnReceived(MqMessage msg)
        {
            Received?.Invoke(msg);
        }

        public void Consume(string queue, object state = null)
        {
            if (_state.ContainsKey(queue))
                throw new ArgumentException($"{queue}已经在监听列表中。");
            _state.Add(queue, state);
        }

        public void Leave(string queue = null)
        {
            if (string.IsNullOrEmpty(queue))
            {
                _state.Clear();
            }
            else if (_state.ContainsKey(queue))
            {
                _state.Remove(queue);
            }
        }

        public void Dispose()
        {
            _subscribers.Remove(this);
            Received = null;
            _state.Clear();
            _state = null;
            _reconsumelist.Clear();
            _reconsumelist = null;
        }
    }
}
